package com.vcs.basicserviceprovider.message;

import com.vcs.basicserviceprovider.stream.EsChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;


/**
 * @ProjectName: spring-cloud-fosun
 * @Package: com.vcs.basicserviceprovider.message
 * @ClassName: BasicMessageByKafka
 * @Author: zhangs
 * @Email: 853632587@qq.com
 * @Description: 接受kafka消息
 * @Date: 2020/12/8 14:13
 * @Version: 1.0
 */
//@EnableBinding(value = EsChannel.class
@Component
public class BasicMessageByKafka {

   private Logger logger= LoggerFactory.getLogger(BasicMessageByKafka.class);

    private static List<Object> tempList=new ArrayList<>();

   /* @Autowired
    private BasicMessage basicMessage;

    @PostConstruct//SubscribableChannel 实现
    public void init(){
        SubscribableChannel subscribableChannel = basicMessage.INPUT();

        subscribableChannel.subscribe(message -> {
            logger.info("Subscribe by SubscribableChannel");
            // message body 是字节流 byte[]
            byte[] body = (byte[]) message.getPayload();
            //处理消息
            process(body);
        });
    }*/

    /*@ServiceActivator(inputChannel = EsChannel.ES_ALARM_INPUT)//@ServiceActivator 实现
    public void listen(byte[] data) {
        logger.info("Subscribe by @ServiceActivator");
        //处理消息
        process(data);
    }


    @StreamListener(EsChannel.ES_ALARM_INPUT)//@StreamListener 实现
    public void onMessage(byte[] data) {
        logger.info("Subscribe by @StreamListener");
        //处理消息
        process(data);
    }

    @StreamListener(EsChannel.MQ_ALARM_INPUT)//@StreamListener 实现
    public void onMessageByMq(byte[] data) {
        logger.info("Subscribe by @StreamListener");
        //处理消息
        process(data);
    }

    private void process(byte[] data){
        ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(inputStream);
            tempList.add(objectInputStream.readObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }*/


    @ServiceActivator(inputChannel = EsChannel.ES_ALARM_INPUT)//@ServiceActivator 实现
    public void listen(Message<String> data) {
        logger.info("Subscribe by @ServiceActivator");
        //处理消息
        process(data);
    }


    @StreamListener(EsChannel.ES_ALARM_INPUT)//@StreamListener 实现
    public void onMessage(Message<String> data) {
        logger.info("Subscribe by @StreamListener");
        //处理消息
        process(data);
    }

    @StreamListener(EsChannel.MQ_ALARM_INPUT)//@StreamListener 实现
    public void onMessageByMq(Message<String> data) {
        logger.info("Subscribe by @StreamListener");
        //处理消息
        process(data);
    }

    private void process(Message<String> data){
        try {
            tempList.add(data);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * 返回消息集合
     * @return
     */
    public static List<Object> getMessages(){
        return tempList;
    }

}
